package org.niit.service

import java.io.{FileWriter, PrintWriter}
import java.text.SimpleDateFormat

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.DStream
import org.niit.common.TService
import org.niit.bean.AdClickData

import scala.collection.mutable.ListBuffer

class TimeCountService extends TService{

  override def dataAnalysis(data: DStream[AdClickData]): Any = {

    /*
     最近一分钟的广告点击数据，每10秒统计一次
     10:56  --> 10:50   16545648946  -> 16545648000  字符串替换
     10:58 -->  10:50    58 / 10 = 5  5 * 10 = 50
     */
    val reduceData =  data.map(v=>{
      //获得当前广告点击的时间戳
      val ts = v.ts.toLong
      val tsInt = ts / 10000  * 10000

      (tsInt,1)
    }).reduceByKeyAndWindow(
      (x:Int,y:Int)=>{x+y},
      Seconds(60),
      Seconds(10)
    )

    reduceData.foreachRDD(rdd=>{
      //将格式化后的数据 写入到 adClick.json
      val list = ListBuffer[String]()
      val datas = rdd.sortByKey(true).collect()
      datas.foreach{
        case (time,cnt)=>{
          val sdf = new SimpleDateFormat("HH:mm:ss")
          val date = sdf.format(new java.util.Date(time.toLong))
          // { "xtime":"59:50", "yval":"88" }
          list.append(s"""{"xtime": "${date} ", "yval":"${cnt}" } """)
        }
      }

      //输出文件
      val out = new PrintWriter(new FileWriter("data/adclick.json"))
      out.println("[" + list.mkString(",") + "]")
      out.flush()
      out.close()
    })

  }
}


